/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operationexecutor

import (
	"fmt"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/record"
	"k8s.io/klog/v2"
	kevents "k8s.io/kubernetes/pkg/kubelet/events"
	"k8s.io/kubernetes/pkg/volume/util"
	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)

type NodeExpander struct {
	nodeResizeOperationOpts
	kubeClient clientset.Interface
	recorder   record.EventRecorder

	// computed via precheck
	pvcStatusCap resource.Quantity
	pvCap        resource.Quantity
	resizeStatus *v1.PersistentVolumeClaimResizeStatus

	// pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet
	// PVC has already been updated - possibly because expansion already succeeded on different node.
	// This can happen when a RWX PVC is expanded.
	pvcAlreadyUpdated bool
}

func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander {
	return &NodeExpander{
		kubeClient:              client,
		nodeResizeOperationOpts: resizeOp,
		recorder:                recorder,
	}
}

// testResponseData is merely used for doing sanity checks in unit tests
type testResponseData struct {
	// indicates that resize operation was called on underlying volume driver
	// mainly useful for testing.
	resizeCalledOnPlugin bool

	// Indicates whether kubelet should assume resize operation as finished.
	// For kubelet - resize operation could be assumed as finished even if
	// actual resizing is *not* finished. This can happen, because certain prechecks
	// are failing and kubelet should not retry expansion, or it could happen
	// because resize operation is genuinely finished.
	assumeResizeFinished bool
}

// runPreCheck performs some sanity checks before expansion can be performed on the PVC.
func (ne *NodeExpander) runPreCheck() bool {
	ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage]
	ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage]

	ne.resizeStatus = ne.pvc.Status.ResizeStatus

	// PVC is already expanded but we are still trying to expand the volume because
	// last recorded size in ASOW is older. This can happen for RWX volume types.
	if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) {
		ne.pvcAlreadyUpdated = true
	}

	// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we
	// should allow volume expansion on the node to proceed. We are making an exception for
	// resizeStatus being nil because it will support use cases where
	// resizeStatus may not be set (old control-plane expansion controller etc).
	if ne.resizeStatus == nil ||
		ne.pvcAlreadyUpdated ||
		*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending ||
		*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
		return true
	}

	return false
}

func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
	allowExpansion := ne.runPreCheck()
	if !allowExpansion {
		return false, nil, testResponseData{false, true}
	}

	var err error
	nodeName := ne.vmt.Pod.Spec.NodeName

	if !ne.pvcAlreadyUpdated {
		ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient)

		if err != nil {
			msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
			klog.Errorf(msg.Error())
			return false, err, testResponseData{}
		}
	}
	_, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts)
	if resizeErr != nil {
		if volumetypes.IsOperationFinishedError(resizeErr) {
			var markFailedError error
			ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient)
			if markFailedError != nil {
				klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
			}
		}

		// if driver returned FailedPrecondition error that means
		// volume expansion should not be retried on this node but
		// expansion operation should not block mounting
		if volumetypes.IsFailedPreconditionError(resizeErr) {
			ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName)
			klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
			return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
		}
		return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
	}
	simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
	ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
	ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
	klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod))

	// no need to update PVC object if we already updated it
	if ne.pvcAlreadyUpdated {
		return true, nil, testResponseData{true, true}
	}

	// File system resize succeeded, now update the PVC's Capacity to match the PV's
	ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient)
	if err != nil {
		return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true}
	}
	return true, nil, testResponseData{true, true}
}
